package org.example.listener

import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.scheduler.{StreamInputInfo, StreamingListener, StreamingListenerBatchStarted}
import org.example.common.Logging
import org.example.constant.ApolloConst
import org.example.utils.KafkaUtil

import scala.collection.mutable.ArrayBuffer

/**
  * 监控批处理时间
  */
class BatchProcessListener(ssc: StreamingContext) extends StreamingListener with Logging {

  private val message = new JSONObject()
  private var batchTime = 60L
  message.put("applicationId", ssc.sparkContext.applicationId)
  message.put("applicationName", ssc.sparkContext.appName)
  message.put("startTime", ssc.sparkContext.startTime)


  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
    batchTime = batchStarted.batchInfo.batchTime.milliseconds
    val delay: Option[Long] = batchStarted.batchInfo.schedulingDelay
    val schedulingDelay: Long =if(delay.isDefined){delay.get}else{0}
    if (schedulingDelay / batchTime * 1000 > 10) {//任务阻塞报警
      message.put("isError", false)
      message.put("errorMessage", "")
      val warn = new JSONObject()
      warn.put("batchTime",batchTime)
      warn.put("schedulingDelay",schedulingDelay)
      warn.put("numRecords",batchStarted.batchInfo.numRecords)
      val topics=new ArrayBuffer[String]()
      batchStarted.batchInfo.streamIdToInputInfo.foreach((tuple: (Int, StreamInputInfo)) =>{
        val offsets: Any = tuple._2.metadata("offsets")
        val topic: String = classOf[List[OffsetRange]].cast(offsets).head.topic
        topics.append(topic)
      })
      warn.put("topics",topics.mkString(","))
      message.put("warnMessage", warn)
      val producer: KafkaProducer[String, String] = KafkaUtil.getKafkaProducer(ApolloConst.bootstrap)
      val topic:String="sparkWarn"
      producer.send(new ProducerRecord(topic,message.toJSONString))
      producer.close()
    }
  }
}
